package org.codehaus.activemq.store.jdbm;

import java.io.IOException;
import javax.jms.JMSException;
import jdbm.btree.BTree;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.AlreadyClosedException;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.service.impl.MessageEntry;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class JdbmMessageStore
  implements MessageStore
{
  private static final Log log = LogFactory.getLog(JdbmMessageStore.class);
  private MessageContainer container;
  private BTree messageTable;
  private BTree orderedIndex;
  private long lastSequenceNumber = 0L;

  public JdbmMessageStore(BTree messageTable, BTree orderedIndex) {
    this.messageTable = messageTable;
    this.orderedIndex = orderedIndex;
  }

  public void setMessageContainer(MessageContainer container) {
    this.container = container;
  }

  public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
    if (log.isDebugEnabled()) {
      log.debug("Adding message to container: " + message);
    }
    MessageEntry entry = new MessageEntry(message);
    Object sequenceNumber = null;
    synchronized (this) {
      sequenceNumber = new Long(++this.lastSequenceNumber);
    }
    try {
      String messageID = message.getJMSMessageID();
      getMessageTable().insert(messageID, entry, true);
      getOrderedIndex().insert(sequenceNumber, messageID, true);

      MessageIdentity answer = message.getJMSMessageIdentity();
      answer.setSequenceNumber(sequenceNumber);
      return answer;
    } catch (IOException e) {
    	 throw JMSExceptionHelper.newJMSException("Failed to add message: " + message + " in container: " + e, e);

    }
     }

  public synchronized ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException
  {
    String messageID = identity.getMessageID();
    ActiveMQMessage message = null;
    try {
      MessageEntry entry = (MessageEntry)getMessageTable().find(messageID);
      if (entry != null) {
        message = entry.getMessage();
        message.getJMSMessageIdentity().setSequenceNumber(identity.getSequenceNumber());
      }
    }
    catch (IOException e) {
      throw JMSExceptionHelper.newJMSException("Failed to get message for messageID: " + messageID + " " + e, e);
    }
    return message;
  }

  public synchronized void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
    String messageID = identity.getMessageID();
    if (messageID == null)
      throw new JMSException("Cannot remove message with null messageID for sequence number: " + identity.getSequenceNumber());
    try
    {
      Object sequenceNumber = identity.getSequenceNumber();
      if (sequenceNumber == null) {
        sequenceNumber = findSequenceNumber(messageID);
        identity.setSequenceNumber(sequenceNumber);
      }
      getMessageTable().remove(messageID);
      getOrderedIndex().remove(sequenceNumber);
    }
    catch (IOException e) {
      throw JMSExceptionHelper.newJMSException("Failed to delete message for messageID: " + messageID + " " + e, e);
    }
  }

  public synchronized void recover(QueueMessageContainer container) throws JMSException {
    try {
      Tuple tuple = new Tuple();
      TupleBrowser iter = getOrderedIndex().browse();
      while (iter.getNext(tuple)) {
        Long key = (Long)tuple.getKey();
        MessageIdentity messageIdentity = null;
        if (key != null) {
          String messageID = (String)tuple.getValue();
          if (messageID != null) {
            messageIdentity = new MessageIdentity(messageID, key);
          }
        }
        if (messageIdentity != null) {
          container.recoverMessageToBeDelivered(messageIdentity);
        }
        else
          log.warn("Could not find message for sequenceNumber: " + key);
      }
    }
    catch (IOException e)
    {
      throw JMSExceptionHelper.newJMSException("Failed to recover the durable queue store. Reason: " + e, e);
    }
  }

  public synchronized void start() throws JMSException
  {
    try {
      Tuple tuple = new Tuple();
      Long lastSequenceNumber = null;
      TupleBrowser iter = getOrderedIndex().browse();
      while (iter.getNext(tuple)) {
        lastSequenceNumber = (Long)tuple.getKey();
      }
      if (lastSequenceNumber != null) {
        this.lastSequenceNumber = lastSequenceNumber.longValue();
        if (log.isDebugEnabled()) {
          log.debug("Last sequence number is: " + lastSequenceNumber + " for: " + this);
        }

      }
      else if (log.isDebugEnabled()) {
        log.debug("Started empty database for: " + this);
      }
    }
    catch (IOException e)
    {
      throw JMSExceptionHelper.newJMSException("Failed to find the last sequence number. Reason: " + e, e);
    }
  }

  public synchronized void stop() throws JMSException {
    JMSException firstException = closeTable(this.orderedIndex, null);
    firstException = closeTable(this.messageTable, firstException);
    this.orderedIndex = null;
    this.messageTable = null;
    if (firstException != null)
      throw firstException;
  }

  protected MessageContainer getContainer()
  {
    return this.container;
  }

  protected long getLastSequenceNumber() {
    return this.lastSequenceNumber;
  }

  protected BTree getMessageTable() throws AlreadyClosedException {
    if (this.messageTable == null) {
      throw new AlreadyClosedException("JDBM MessageStore");
    }
    return this.messageTable;
  }

  protected BTree getOrderedIndex() throws AlreadyClosedException {
    if (this.orderedIndex == null) {
      throw new AlreadyClosedException("JDBM MessageStore");
    }
    return this.orderedIndex;
  }

  protected ActiveMQMessage getMessageBySequenceNumber(Long sequenceNumber)
    throws IOException, JMSException
  {
    ActiveMQMessage message = null;
    String messageID = (String)getOrderedIndex().find(sequenceNumber);
    if (messageID != null) {
      message = getMessage(new MessageIdentity(messageID, sequenceNumber));
    }
    return message;
  }

  protected Object findSequenceNumber(String messageID)
    throws IOException, AlreadyClosedException
  {
    log.warn("Having to table scan to find the sequence number for messageID: " + messageID);

    Tuple tuple = new Tuple();
    TupleBrowser iter = getOrderedIndex().browse();
    while (iter.getNext(tuple)) {
      Object value = tuple.getValue();
      if (messageID.equals(value)) {
        return tuple.getKey();
      }
    }
    return null;
  }

  protected JMSException closeTable(BTree table, JMSException firstException) {
    table = null;
    return null;
  }
}